Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# disco models
**/models
2 changes: 1 addition & 1 deletion discojs-node/src/data/text_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class TextLoader extends data.TextLoader<string> {
const inputFile = await fs.readFile(source)
const file = new tfData.FileDataSource(inputFile, { chunkSize: 1024 })
// TODO: reading files line by line is an issue for LLM tokenization
const dataset = new tfData.TextLineDataset(file).filter(s => s != ' ') // newline creates empty strings
const dataset = new tfData.TextLineDataset(file).filter(s => s !== ' ') // newline creates empty strings
return Promise.resolve(dataset)
}
}
2 changes: 1 addition & 1 deletion discojs-web/src/data/text_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { data } from '@epfml/discojs'
export class TextLoader extends data.TextLoader<File> {
loadDatasetFrom (source: File): Promise<data.Dataset> {
const file = new tf.data.FileDataSource(source)
const dataset = new tf.data.TextLineDataset(file).filter(s => s != ' ') // newline creates empty strings
const dataset = new tf.data.TextLineDataset(file).filter(s => s !== ' ') // newline creates empty strings
return Promise.resolve(dataset)
}
}
130 changes: 74 additions & 56 deletions discojs-web/src/memory/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,73 @@ import { Map } from 'immutable'
import * as tf from '@tensorflow/tfjs'

import type { Path, Model, ModelInfo, ModelSource } from '@epfml/discojs'
import { Memory, ModelType, models } from '@epfml/discojs'
import { Memory, models } from '@epfml/discojs'

export class IndexedDB extends Memory {
override pathFor (source: ModelSource): Path {
override getModelMemoryPath (source: ModelSource): Path {
if (typeof source === 'string') {
return source
}

if (source.type === undefined || source.taskID === undefined || source.name === undefined) {
throw new TypeError('source incomplete')
}

const version = source.version ?? 0

return `indexeddb://${source.type}/${source.taskID}/${source.name}@${version}`
return `indexeddb://${source.type}/${source.tensorBackend}/${source.taskID}/${source.name}@${version}`
}

override infoFor (source: ModelSource): ModelInfo {
override getModelInfo (source: ModelSource): ModelInfo {
if (typeof source !== 'string') {
return source
}
const [stringType, taskID, fullName] = source.split('/').splice(2)
const [type, tensorBackend, taskID, fullName] = source.split('/').splice(2)

const type = stringType === 'working' ? ModelType.WORKING : ModelType.SAVED
if (type !== 'working' && type !== 'saved') {
throw Error("Unknown memory model type")
}

const [name, versionSuffix] = fullName.split('@')
const version = versionSuffix === undefined ? 0 : Number(versionSuffix)
return { type, taskID, name, version }
if (tensorBackend !== 'tfjs' && tensorBackend !== 'gpt') {
throw Error("Unknown tensor backend")
}
return { type, taskID, name, version, tensorBackend }
}

async getModelMetadata (source: ModelSource): Promise<tf.io.ModelArtifactsInfo | undefined> {
const models = await tf.io.listModels()
return models[this.pathFor(source)]
return models[this.getModelMemoryPath(source)]
}

async contains (source: ModelSource): Promise<boolean> {
return await this.getModelMetadata(source) !== undefined
}

override async getModel (source: ModelSource): Promise<Model> {
return new models.TFJS(await tf.loadLayersModel(this.pathFor(source)))
override async getModel(source: ModelSource): Promise<Model> {
const layersModel = await tf.loadLayersModel(this.getModelMemoryPath(source))

const tensorBackend = this.getModelInfo(source).tensorBackend
switch (tensorBackend) {
case 'tfjs':
return new models.TFJS(layersModel)
case 'gpt':
return new models.GPT(undefined, layersModel)
default: {
const _: never = tensorBackend
throw new Error('should never happen')
}
}
}

async deleteModel (source: ModelSource): Promise<void> {
await tf.io.removeModel(this.pathFor(source))
await tf.io.removeModel(this.getModelMemoryPath(source))
}

async loadModel (source: ModelSource): Promise<void> {
const src = this.infoFor(source)
if (src.type === ModelType.WORKING) {
async loadModel(source: ModelSource): Promise<void> {
const src = this.getModelInfo(source)
if (src.type === 'working') {
// Model is already loaded
return
}
await tf.io.copyModel(
this.pathFor(src),
this.pathFor({ ...src, type: ModelType.WORKING, version: 0 })
this.getModelMemoryPath(src),
this.getModelMemoryPath({ ...src, type: 'working', version: 0 })
)
}

Expand All @@ -75,91 +86,98 @@ export class IndexedDB extends Memory {
* @param model the model
*/
override async updateWorkingModel (source: ModelSource, model: Model): Promise<void> {
const src: ModelInfo = this.infoFor(source)
if (src.type !== undefined && src.type !== ModelType.WORKING) {
throw new Error('expected working model')
const src: ModelInfo = this.getModelInfo(source)
if (src.type !== 'working') {
throw new Error('expected working type model')
}

// Enforce version 0 to always keep a single working model at a time
const modelInfo = { ...src, type: 'working' as const, version: 0 }
let includeOptimizer;
if (model instanceof models.TFJS) {
await model.extract().save(this.pathFor({ ...src, type: ModelType.WORKING, version: 0 }), { includeOptimizer: true })
modelInfo['tensorBackend'] = 'tfjs'
includeOptimizer = true
} else if (model instanceof models.GPT) {
modelInfo['tensorBackend'] = 'gpt'
includeOptimizer = false // true raises an error
} else {
throw new Error('unknown model type')
}

// Enforce version 0 to always keep a single working model at a time
const indexedDBURL = this.getModelMemoryPath(modelInfo)
await model.extract().save(indexedDBURL, { includeOptimizer })
}

/**
* Creates a saved copy of the working model corresponding to the source.
* @param source the source
*/
async saveWorkingModel (source: ModelSource): Promise<Path> {
const src: ModelInfo = this.infoFor(source)
if (src.type !== undefined && src.type !== ModelType.WORKING) {
throw new Error('expected working model')
const src: ModelInfo = this.getModelInfo(source)
if (src.type !== 'working') {
throw new Error('expected working type model')
}
const dst = this.pathFor(await this.duplicateSource({ ...src, type: ModelType.SAVED }))
const dst = this.getModelMemoryPath(await this.duplicateSource({ ...src, type: 'saved' }))
await tf.io.copyModel(
this.pathFor({ ...src, type: ModelType.WORKING }),
this.getModelMemoryPath({ ...src, type: 'working' }),
dst
)
return dst
}

override async saveModel (source: ModelSource, model: Model): Promise<Path> {
const src: ModelInfo = this.infoFor(source)
if (src.type !== undefined && src.type !== ModelType.SAVED) {
throw new Error('expected saved model')
const src: ModelInfo = this.getModelInfo(source)
if (src.type !== 'saved') {
throw new Error('expected saved type model')
}
const dst = this.pathFor(await this.duplicateSource({ ...src, type: ModelType.SAVED }))

const modelInfo = await this.duplicateSource({ ...src, type: 'saved' })
let includeOptimizer;
if (model instanceof models.TFJS) {
await model.extract().save(dst, { includeOptimizer: true })
} else {
throw new Error('unknown model type')
}
return dst
modelInfo['tensorBackend'] = 'tfjs'
includeOptimizer = true
} else if (model instanceof models.GPT) {
modelInfo['tensorBackend'] = 'gpt'
includeOptimizer = false // true raises an error
} else {
throw new Error('unknown model type')
}
const indexedDBURL = this.getModelMemoryPath(modelInfo)
await model.extract().save(indexedDBURL, { includeOptimizer })
return indexedDBURL
}

/**
* Downloads the model corresponding to the source.
* @param source the source
*/
async downloadModel (source: ModelSource): Promise<void> {
const src: ModelInfo = this.infoFor(source)
const src: ModelInfo = this.getModelInfo(source)
await tf.io.copyModel(
this.pathFor(source),
this.getModelMemoryPath(source),
`downloads://${src.taskID}_${src.name}`
)
}

async latestDuplicate (source: ModelSource): Promise<number | undefined> {
if (typeof source !== 'string') {
source = this.pathFor({ ...source, version: 0 })
source = this.getModelMemoryPath({ ...source, version: 0 })
}

// perform a single memory read
const paths = Map(await tf.io.listModels())

if (!paths.has(source)) {
return undefined
}

const latest = Map(paths)
.keySeq()
.toList()
.map((p) => this.infoFor(p).version)
.max()

const latest = Map(paths).keySeq().toList()
.map((p) => this.getModelInfo(p).version).max()
if (latest === undefined) {
return 0
}

return latest
}

async duplicateSource (source: ModelSource): Promise<ModelInfo> {
const latestDuplicate = await this.latestDuplicate(source)
source = this.infoFor(source)
source = this.getModelInfo(source)

if (latestDuplicate === undefined) {
return source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ describe('text preprocessing', function () {
scheme: 'local',
dataType: 'text',
tokenizer: 'Xenova/gpt2',
tensorBackend: 'gpt'
}}
}

Expand Down
4 changes: 2 additions & 2 deletions discojs/src/dataset/data/preprocessing/text_preprocessing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ interface TokenizedEntry extends tf.TensorContainerObject {
const leftPadding: PreprocessingFunction = {
type: TextPreprocessing.LeftPadding,
apply: async (x: Promise<tf.TensorContainer>, task: Task): Promise<tf.TensorContainer> => {
if (x === undefined || !Array.isArray(x) || x.length == 0 || typeof(x[0] != 'number')) {
if (x === undefined || !Array.isArray(x) || x.length == 0 || typeof(x[0] !== 'number')) {
new Error("The leftPadding preprocessing expects a non empty 1D array of number")
}
const { tokens } = await x as TokenizedEntry
Expand Down Expand Up @@ -72,7 +72,7 @@ interface TokenizerOutput {
const tokenize: PreprocessingFunction = {
type: TextPreprocessing.Tokenize,
apply: async (x: Promise<tf.TensorContainer>, task: Task): Promise<tf.TensorContainer> => {
if (typeof x != 'string') {
if (typeof x !== 'string') {
new Error("The tokenize preprocessing expects a string as input")
}
const xs = await x as string // tf.TextLineDataset yields strings
Expand Down
2 changes: 1 addition & 1 deletion discojs/src/dataset/data_loader/image_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export abstract class ImageLoader<Source> extends DataLoader<Source> {
const numberOfClasses = labelList.length
// Map label strings to integer
const label_to_int = new Map(labelList.map((label_name, idx) => [label_name, idx]))
if (label_to_int.size != numberOfClasses) {
if (label_to_int.size !== numberOfClasses) {
throw new Error("Input labels aren't matching the task LABEL_LIST")
}

Expand Down
3 changes: 2 additions & 1 deletion discojs/src/default_tasks/cifar10.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ export const cifar10: TaskProvider = {
clippingRadius: 20,
decentralizedSecure: true,
minimumReadyPeers: 3,
maxShareValue: 100
maxShareValue: 100,
tensorBackend: 'tfjs'
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion discojs/src/default_tasks/lus_covid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export const lusCovid: TaskProvider = {
clippingRadius: 20,
decentralizedSecure: true,
minimumReadyPeers: 2,
maxShareValue: 100
maxShareValue: 100,
tensorBackend: 'tfjs'
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion discojs/src/default_tasks/mnist.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ export const mnist: TaskProvider = {
clippingRadius: 20,
decentralizedSecure: true,
minimumReadyPeers: 3,
maxShareValue: 100
maxShareValue: 100,
tensorBackend: 'tfjs'
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion discojs/src/default_tasks/simple_face.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export const simpleFace: TaskProvider = {
LABEL_LIST: ['child', 'adult'],
scheme: 'federated', // secure aggregation not yet implemented for federated
noiseScale: undefined,
clippingRadius: undefined
clippingRadius: undefined,
tensorBackend: 'tfjs'
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion discojs/src/default_tasks/skin_condition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ export const skinCondition: TaskProvider = {
LABEL_LIST: LABELS,
scheme: 'federated',
noiseScale: undefined,
clippingRadius: undefined
clippingRadius: undefined,
tensorBackend: 'tfjs'
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion discojs/src/default_tasks/titanic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ export const titanic: TaskProvider = {
],
scheme: 'federated', // secure aggregation not yet implemented for FeAI
noiseScale: undefined,
clippingRadius: undefined
clippingRadius: undefined,
tensorBackend: 'tfjs'
}
}
},
Expand Down
17 changes: 8 additions & 9 deletions discojs/src/default_tasks/wikitext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ export const wikitext: TaskProvider = {
dataType: 'text',
modelID: 'wikitext-103-raw-model',
preprocessingFunctions: [data.TextPreprocessing.Tokenize, data.TextPreprocessing.LeftPadding],
validationSplit: 0.2, // TODO: is this used somewhere? because train, eval and test are already split in dataset
epochs: 5,
scheme: 'federated',
noiseScale: undefined,
decentralizedSecure: true,
minimumReadyPeers: 3,
maxShareValue: 100,
roundDuration: 10,
batchSize: 16,
epochs: 5,
// Unused by wikitext because data already comes split
// But if set to 0 then the webapp doesn't display the validation metrics
validationSplit: 0.1,
roundDuration: 2,
batchSize: 1, // If set too high (e.g. 16) then firefox raises a WebGL error
tokenizer: 'Xenova/gpt2',
maxSequenceLength: 128
maxSequenceLength: 128,
tensorBackend: 'gpt'
}
}
},
Expand Down
3 changes: 1 addition & 2 deletions discojs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ export * as data from './dataset/index.js'
export * as serialization from './serialization/index.js'
export * as training from './training/index.js'
export * as privacy from './privacy.js'
export { GraphInformant } from './informant/index.js'

export * as client from './client/index.js'
export * as aggregator from './aggregator/index.js'

export { WeightsContainer, aggregation } from './weights/index.js'
export { AsyncInformant } from './async_informant.js'
export { Logger, ConsoleLogger } from './logging/index.js'
export { Memory, ModelType, type ModelInfo, type Path, type ModelSource, Empty as EmptyMemory } from './memory/index.js'
export { Memory, type ModelInfo, type Path, type ModelSource, Empty as EmptyMemory } from './memory/index.js'
export { Disco, RoundLogs } from './training/index.js'
export { Validator } from './validation/index.js'

Expand Down
Loading