Skip to content

重复运行一个reader就会报错 #24132

Closed
@weiexcelpro

Description

@weiexcelpro

用户重复运行case复现

###现象
用户描述:
aistudio有一个reader的问题
我创建了一个神经网络的类,并使用自己定义一个reader
然后我把神经网络封装在一个CNN的类里面,但是当我修改这个类并执行该类的cell之后
再运行就会报错
在本地notebook测试之后发现并没有该问题
如果有需要我可以提供源代码
问题很容易复现
使用官方提供的reader没有此问题
报错录屏: https://ecloud.baidu.com?t=a07b8d2113c3f8445036ba999c2ab3ee
##用户录屏分析

用户4:53 重启kernel 清除内核
图片
7:09运行 preTrain=True可以正常输出
图片
7:28用户再次运行也是可以输出正常
图片
7:56 用户重新运行cell2
图片
7:58 用户运行cell3 CNN定义类
图片
8:30 用户出现Error
用户强调什么没有操作或者删除了一行复现这个问题,再次强调本地是不会有这个问题的
cid:image007.png@01D61982.2053E420图片
推测:用户重复运行cell,没有清空之前的cell执行内容。
##aistudio复现过程
复现思路:
###aistudio运行多次官方的reader会不会报错
图片
依次执行上面cell,先执行官网的reader preTrain=false
图片
图片
####aistudio运行多次非官方的reader会不会报错
图片
图片
####本地运行多次官方的reader会不会报错
图片
图片
图片
####本地运行多次非官方的reader会不会报错
图片
图片
####aistudio 运行一次官方reader 一次非官方reader

####本地运行一次官方reader 一次非官方reader
图片


##结论
本地notebook和aistudio都会有这样的问题,运行多次 会报错。
报错异常栈信息:

/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/fluid/executor.py:782: UserWarning: The following exception is not an EOF exception.
  "The following exception is not an EOF exception.")
---------------------------------------------------------------------------RuntimeError                              Traceback (most recent call last)<ipython-input-7-a8e1aa90c962> in <module>
      1 cnn = CNN(16, 0.001, 1, use_cuda=False,  network=2,pre_train = False)
      2 
----> 3 cnn.train()
      4 cnn.drawTrainProcess("cost", "acc")
<ipython-input-4-051cd9ff1e89> in train(self, EPOCH_NUM)
    267                         program = self.test_program,
    268                         feed = self.__feeder.feed(data),
--> 269                         fetch_list = [self.avg_cost, self.acc]
    270                     )
    271                     test_costs.append(test_cost)
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/fluid/executor.py in run(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache)
    781                 warnings.warn(
    782                     "The following exception is not an EOF exception.")
--> 783             six.reraise(*sys.exc_info())
    784 
    785     def _run_impl(self, program, feed, fetch_list, feed_var_name,
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/fluid/executor.py in run(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache)
    776                 scope=scope,
    777                 return_numpy=return_numpy,
--> 778                 use_program_cache=use_program_cache)
    779         except Exception as e:
    780             if not isinstance(e, core.EOFException):
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/fluid/executor.py in _run_impl(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache)
    829                 scope=scope,
    830                 return_numpy=return_numpy,
--> 831                 use_program_cache=use_program_cache)
    832 
    833         program._compile(scope, self.place)
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/fluid/executor.py in _run_program(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache)
    903         if not use_program_cache:
    904             self._default_executor.run(program.desc, scope, 0, True, True,
--> 905                                        fetch_var_name)
    906         else:
    907             self._default_executor.run_prepared_ctx(ctx, scope, False, False,
RuntimeError: boost::bad_get: failed value get using boost::get

##附用户源码:

#%%
import paddle as paddle
import paddle.fluid as fluid
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import os
import time

#%%
train_dataset = []
test_dataset = []
train_reader = paddle.batch(
    paddle.reader.shuffle(paddle.dataset.cifar.train10(),
                            buf_size=1024 * 100),
                    batch_size=1024
                )
for i, data in enumerate(train_reader()):
    if i == 4:
        break
    train_dataset.extend(data)
    test_dataset.extend(data)

    
def trainReader():
    for sample in train_dataset:
        yield sample
def testReader():
    for sample in test_dataset:
        yield sample

# %%
class CNN(object):

    def __init__(self,BATCH_SIZE, learning_rate, EPOCH_NUM, use_cuda, network = 0, pre_train = False):
        '''
        network 用于选择不同类型的网络

        '''
        self.__BATCH_SIZE_ = BATCH_SIZE
        self.__learning_rate_ = learning_rate
        self.__EPOCH_NUM_ = EPOCH_NUM
        self.__use_cuda_ = use_cuda
        self.__network_ = network
        self.__pre_train_ = pre_train
        # self.train_dataset = [] # 当num_data设定的有值的时候使用
        self.train_reader_, self.test_reader_ = self.__defReader()
        self.cnn_ = 0 # 无意义,用于声明
        self.__config()

        self.train_iters_ = []
        self.train_costs_ = []
        self.train_accs_ = []
        
        self.test_ids_ = []
        self.test_costs_ = []
        self.test_accs_ = []

    # 定义数据提供器
    def __defReader(self):
        '''
        定义数据提供器
        '''
        train_reader = paddle.batch(
            paddle.reader.shuffle(paddle.dataset.cifar.train10(),
                                    buf_size=self.__BATCH_SIZE_ * 100),
            batch_size=self.__BATCH_SIZE_
        )

        test_reader = paddle.batch(
            paddle.dataset.cifar.test10(),
            batch_size=self.__BATCH_SIZE_
        )
        
        if self.__pre_train_ == True:
            print("preTrain")
            train_reader = fluid.io.batch(
                fluid.io.shuffle(trainReader, buf_size=self.__BATCH_SIZE_ * 100),
                                    batch_size=self.__BATCH_SIZE_
                                )
            test_reader = fluid.io.batch(
                fluid.io.shuffle(testReader, buf_size=self.__BATCH_SIZE_ * 100),
                                    batch_size=self.__BATCH_SIZE_
                                )

        return (train_reader, test_reader)


    def __convolutionalNeuralNetwork2(self, img):
        # 第一个卷积-池化层
        conv_pool_1 = fluid.nets.simple_img_conv_pool(
            input=img,         # 输入图像
            filter_size=5,     # 滤波器的大小
            num_filters=20,    # filter 的数量。它与输出的通道相同
            pool_size=2,       # 池化核大小2*2
            pool_stride=2,     # 池化步长
            act="relu")        # 激活类型
        conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
        # 第二个卷积-池化层
        conv_pool_2 = fluid.nets.simple_img_conv_pool(
            input=conv_pool_1,
            filter_size=5,
            num_filters=50,
            pool_size=2,
            pool_stride=2,
            act="relu")
        return conv_pool_2


    # 搭建CNN网络
    def __convolutionalNeuralNetwork3(self,img):
        '''
        搭建CNN网络
        '''
        # 第一个卷积-池化层
        conv_pool_1 = fluid.nets.simple_img_conv_pool(
            input=img,         # 输入图像
            filter_size=5,     # 滤波器的大小
            num_filters=20,    # filter 的数量。它与输出的通道相同
            pool_size=2,       # 池化核大小2*2
            pool_stride=2,     # 池化步长
            act="relu")        # 激活类型
        conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
        # 第二个卷积-池化层
        conv_pool_2 = fluid.nets.simple_img_conv_pool(
            input=conv_pool_1,
            filter_size=5,
            num_filters=50,
            pool_size=2,
            pool_stride=2,
            act="relu")
        conv_pool_2 = fluid.layers.batch_norm(conv_pool_2)
        # 第三个卷积-池化层
        conv_pool_3 = fluid.nets.simple_img_conv_pool(
            input=conv_pool_2,
            filter_size=5,
            num_filters=50,
            pool_size=2,
            pool_stride=2,
            act="relu")

        return conv_pool_3

    # 搭建CNN网络
    def __convolutionalNeuralNetworkFc(self,img):
        
        net = self.__convolutionalNeuralNetwork3(img)

        ret = fluid.layers.fc(input = net,
                                size = 64,
                                act = "relu")
        
        return ret
    
    def __network(self, img):
        net = 0 # 用于声明
        if self.__network_ == 0:
            net = self.__convolutionalNeuralNetwork3(img)
        if self.__network_ == 1:
            net = self.__convolutionalNeuralNetworkFc(img)
        if self.__network_ == 2:
            net = self.__convolutionalNeuralNetwork2(img)
        
        # 以softmax为激活函数的全连接输出层,10类数据输出10个数字
        output = fluid.layers.fc(input=net, size=10, act='softmax')
        return output

    # 定义数据输入格式
    def __defInput(self):
        '''
        定义数据输入格式
        '''
        data_shape = [3, 32, 32]
        images = fluid.layers.data(
            name = 'images',
            shape = data_shape,
            dtype = 'float32'
        )
        label = fluid.layers.data(
            name = 'label',
            shape = [1],
            dtype = 'int64'
        )

        return (images, label)

    
    def __config(self):
        self.train_scope = fluid.core.Scope()
        with fluid.scope_guard(self.train_scope):
            # 输入
            images, label = self.__defInput()

            # 神经网络
            self.cnn_ = self.__network(images)

            # cost
            cost = fluid.layers.cross_entropy(
                input = self.cnn_,
                label = label
            )
            self.avg_cost = fluid.layers.mean(cost)

            # accuracy
            self.acc = fluid.layers.accuracy(input = self.cnn_, label = label)

            # 获取测试程序,必须要在设置优化方法之前调用
            self.test_program = fluid.default_main_program().clone(for_test=True)

            # optimizer
            optimizer = fluid.optimizer.Adam(learning_rate = self.__learning_rate_)
            opts = optimizer.minimize(self.avg_cost)
            
            # place
            place = fluid.CPUPlace()
            if self.__use_cuda_:
                place = fluid.CUDAPlace(0)
            self.__exe = fluid.Executor(place)

            # 初始化所有参数
            self.__exe.run(program=fluid.default_startup_program()) 
            
            # DataFeeder 负责将reader(读取器)返回的数据转成一种特殊的数据结构,
            # 使它们可以输入到 Executor
            self.__feeder = fluid.DataFeeder(
                place = place,
                feed_list = [images, label]
            )
    
    def preTrain(self, EPOCH_NUM = 0):
        self.__pre_train_ = True
        if EPOCH_NUM > 0:
            self.__EPOCH_NUM_ = EPOCH_NUM
        train_iter = 0
        self.train_iters_ = []
        self.train_costs_ = []
        self.train_accs_ = []
        
        self.test_ids_ = []
        self.test_costs_ = []
        self.test_accs_ = []
        with fluid.scope_guard(self.train_scope):
            start_time = time.time()
            for pass_id in range(self.__EPOCH_NUM_):
                for batch_id, data in enumerate(self.train_reader_()):
                    train_cost, train_acc = self.__exe.run(
                        program = fluid.default_main_program(),
                        feed = self.__feeder.feed(data),
                        fetch_list = [self.avg_cost, self.acc]
                    )

                    train_iter = train_iter + self.__BATCH_SIZE_
                    self.train_iters_.append(train_iter)
                    self.train_costs_.append(train_cost)
                    self.train_accs_.append(train_acc) 
                    #每100次batch打印一次训练、进行一次测试
                    if (batch_id + 1) % 200 == 0:                                       
                        print('Pass:%d, Batch:%d, Cost:%0.5f, Accuracy:%0.5f' % 
                        (pass_id, batch_id, train_cost[0], train_acc[0]))
                        
                print("Each epoch time: %0.3f", time.time() - start_time)
        
    def train(self,EPOCH_NUM = 0):
        if EPOCH_NUM > 0:
            self.__EPOCH_NUM_ = EPOCH_NUM
        train_iter = 0
        self.train_iters_ = []
        self.train_costs_ = []
        self.train_accs_ = []
        
        self.test_ids_ = []
        self.test_costs_ = []
        self.test_accs_ = []
        with fluid.scope_guard(self.train_scope):
            start_time = time.time()
            for pass_id in range(self.__EPOCH_NUM_):
                for batch_id, data in enumerate(self.train_reader_()):
                    train_cost, train_acc = self.__exe.run(
                        program = fluid.default_main_program(),
                        feed = self.__feeder.feed(data),
                        fetch_list = [self.avg_cost, self.acc]
                    )

                    train_iter = train_iter + self.__BATCH_SIZE_
                    self.train_iters_.append(train_iter)
                    self.train_costs_.append(train_cost)
                    self.train_accs_.append(train_acc)
                    #每100次batch打印一次训练、进行一次测试
                    if (batch_id + 1) % 100 == 0:                                             
                        print('Pass:%d, Batch:%d, Cost:%0.5f, Accuracy:%0.5f' % 
                        (pass_id, batch_id, train_cost[0], train_acc[0]))
                
                
                # 进行测试
                test_costs = []
                test_accs = []
                for batch_id, data in enumerate(self.test_reader_()):
                    test_cost, test_acc = self.__exe.run(
                        program = self.test_program,
                        feed = self.__feeder.feed(data),
                        fetch_list = [self.avg_cost, self.acc]
                    )
                    test_costs.append(test_cost)
                    test_accs.append(test_acc)
                self.test_ids_.append(pass_id)
                test_cost = (sum(test_costs) / len(test_costs))
                self.test_costs_.append(test_cost)
                test_acc = (sum(test_accs) / len(test_accs))
                self.test_accs_.append(test_acc)
                print('Test:%d, Cost:%0.5f, ACC:%0.5f' % (pass_id, test_cost, test_acc))
                print("Each epoch time: %0.3f", time.time() - start_time)
                
    def save(self, path):
        if not os.path.exists(path):
            os.makedirs(path)
        print("save model to %s" % (path))
        with fluid.scope_guard(self.train_scope):
            fluid.io.save_inference_model(path,
                                    ['images'],
                                    [self.cnn_],
                                    self.__exe)
        print("模型保存完成")

    def drawTrainProcess(self, label_cost, label_acc):
        '''
        tp = 0打印train的cost和acc
        tp = 1打印test的cost和acc
        '''
        plt.figure()
        plt.title("training", fontsize=24)
        plt.xlabel("iter", fontsize=20)
        plt.ylabel("cost/acc", fontsize=20)
        plt.plot(self.train_iters_, self.train_costs_, color='red', label=label_cost)
        plt.plot(self.train_iters_, self.train_accs_, color='green', label=label_acc) 
        plt.legend()
        plt.grid()
        plt.show()
        
        plt.figure()
        plt.title("testing", fontsize=24)
        plt.xlabel("passid", fontsize=20)
        plt.ylabel("cost/acc", fontsize=20)
        plt.plot(self.test_ids_, self.test_costs_, color='red', label=label_cost)
        plt.plot(self.test_ids_, self.test_accs_, color='green', label=label_acc) 
        plt.legend()
        plt.grid()

        plt.show()
    

#%%
class Inference(object):
    def __init__(self, use_cuda):
        self.__use_cuda_ = use_cuda
        self.infer_exe = fluid.Executor(self.__place())
        self.inference_scope = fluid.core.Scope()
        self.__label_list = [
        "airplane", "automobile", "bird", "cat", "deer", "dog", "frog", "horse",
        "ship", "truck"
        ]

    def __place(self):
        place = fluid.CPUPlace()
        if self.__use_cuda_:
            place.fluid.CUDAPlace(0)
        return place

    def __loadImage(self, img_path):
        img = Image.open(img_path)
        plt.imshow(img)
        plt.show()
        # 将图片调整为与训练数据一样大小 
        img = img.resize((32, 32), Image.ANTIALIAS)
        img = np.array(img).astype(np.float32)
        img = img.transpose((2, 0, 1))
        # 像素值归一化
        img = img / 255.0
        img = np.expand_dims(img, axis=0)
        return img

    def loadModel(self, path):
        with fluid.scope_guard(self.inference_scope):
                [self.inference_program, # 预测用的program
                    self.feed_target_names, # 一个str列表,包含需要在推理program中提供数据的变量的名称
                    self.fetch_targets] = fluid.io.load_inference_model(path,
                                                        self.infer_exe)
    
    def inference(self,img_path):
        img = self.__loadImage(img_path)
        with fluid.scope_guard(self.inference_scope):
            result = self.infer_exe.run(self.inference_program,
                                        feed = {self.feed_target_names[0]:img},
                                        fetch_list = self.fetch_targets)
            print("infer results: %s" % self.__label_list[np.argmax(result[0])])
        return result


# %%
cnn = CNN(16, 0.001, 10, use_cuda=False,  network=2,pre_train = False)

cnn.train()
cnn.drawTrainProcess("cost", "acc")

#%%
cnn = CNN(256, 0.001, 3, use_cuda=False, pre_train = False)
cnn.train()
cnn.drawTrainProcess("training", "cost", "acc")


#%%
path = "./models/catdog.inference.model"
cnn.save(path)


# %%
infer = Inference(use_cuda=False)
infer.loadModel(path)



#%%
cnnCpu = CNN(128, 0.01, 10, use_cuda=False, pre_train = True)
cnnCpu.train()
cnnCpu.drawTrainProcess("training", "cost", "acc")

# %%

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions