自定义模块教程-开发一个修改数据列名的模块为例-超级详细版

教程
自定义模块
标签: #<Tag:0x00007f5bf93cc920> #<Tag:0x00007f5bf93cc7e0>

(iQuant) #1

本文将详细的展示如何开发自定义模块。我们希望开发一个模块,用来修改数据的列名:

  • 输入:有一个输入数据源,为输入数据
  • 参数:列名映射,格式为:原列名1:新列名1|原列名2:新列名2|原列名3:新列名3|…
  • 输出:有一个输出数据源,为结果数据

1. 准备输入数据,用于开发测试

为了有测试数据方便开发调试,我们直接新建 “可视化策略-空白”,拖入 数据源 模块,并配置如下,用作测试数据:

  • 数据源ID:bar1d_CN_STOCK_A
  • 开始日期:2018-01-01
  • 结束日期:2018-02-01

其数据如下:

2. 添加自定义模块

从左侧自定义模块拖拽 自定义Python模块 到画布中,并如下图连接

3. 代码开发1:现在代码代码单元里一边开发一边测试

为了便于开发,我们新建一个代码单元,先在代码单元里,一边开发,一边测试。

从自定模块主函数里拷贝代码到代码单元:

4. 代码开发2:定义好接口

代码如下:

  • input_ds:输入数据源
  • columns:列名映射
  • Outputs.data:输出数据源
# 修改数据列名
def bigquant_run(input_ds, columns):
    # 示例代码如下。在这里编写您的代码
    # df =  pd.DataFrame({'data': [1, 2, 3]})
    # data_1 = DataSource.write_df(df)
    # data_2 = DataSource.write_pickle(df)
    return Outputs(data=None)

# 在m18上测试
bigquant_run(m18.data, 'avg_amount_0:new_avg_amount_0|rank_return_0/rank_return_5:return_change_5')

5. 代码开发2:代码实现

# 修改数据列名
def bigquant_run(input_ds, columns):
    # 解析列映射为dict, TODO: 验证输入是否有效
    columns = dict(c.split(':') for c in columns.split('|'))
    print('列名映射:', columns)
    # 输出数据源
    dataset_ds = DataSource()
    output_store = dataset_ds.open_df_store()

    for key, df in input_ds.iter_df():
        df.columns = [columns.get(c, c) for c in df.columns]
        df.to_hdf(output_store, key)
        row_count = len(df)
        print('%s: %s' % (key, len(df)))

    dataset_ds.close_df_store()
    return Outputs(data=dataset_ds)

# 6. 在m4上测试
mx = bigquant_run(m4.data, 'avg_amount_0:new_avg_amount_0|rank_return_0/rank_return_5:return_change_5')

7. 测试运行,并查看修改后的数据

8. 自定义模块:拷贝代码到主函数

9. 自定义模块:设置模块接口

注意 模块参数 那里输入是 json 的dict语法(不能有多余的逗号,着和python dict不同)

10. 运行一下自定义模块

11. 查看自定义模块结果

12. 保存模块

  1. 在自定义模块上右键
  2. 选中新建模块
  3. 设置模块id,必须为英文的:rename_columns
  4. 设置模块信息
    • 显示名称改为更友好的中文名
    • 开放代码:如果勾选,别人可以看到此模块的源代码,这个教程里,我们勾选上
  5. 注意:刷新页面,就可以看到刚才新建的模块了
    image

13. 使用新建的模块

在刚才的实验里,拖入自定义模块,并连接数据源、配置参数如下,运行得到结果。

14. 恭喜,您已经可以创建自己的模块,扩展平台功能了,完整代码如下:

克隆策略

    {"Description":"实验创建于2017/8/26","Summary":"","Graph":{"EdgesInternal":[{"DestinationInputPortId":"-539:input_1","SourceOutputPortId":"-185:data"},{"DestinationInputPortId":"-27:input_ds","SourceOutputPortId":"-185:data"}],"ModuleNodes":[{"Id":"-539","ModuleId":"BigQuantSpace.cached.cached-v3","ModuleParameters":[{"Name":"run","Value":"# 修改数据列名\ndef bigquant_run(input_ds, columns, keep_old_columns):\n # 解析列映射为dict, TODO: 验证输入是否有效\n columns = dict(c.split(':') for c in columns.split('|'))\n print('列名映射:', columns)\n # 输出数据源\n dataset_ds = DataSource()\n output_store = dataset_ds.open_df_store()\n\n for key, df in input_ds.iter_df():\n old_column_set = set(df.columns)\n for old_col, new_col in columns.items():\n if old_col not in old_column_set:\n print('警告:列 %s 不存在' % old_col)\n if keep_old_columns:\n for old_col, new_col in columns.items():\n if old_col in old_column_set:\n df[new_col] = df[old_col]\n else:\n df.columns = [columns.get(c, c) for c in df.columns]\n df.to_hdf(output_store, key)\n row_count = len(df)\n print('%s: %s' % (key, len(df)))\n\n dataset_ds.close_df_store()\n return Outputs(data=dataset_ds)\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"post_run","Value":"# 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。\ndef bigquant_run(outputs):\n return outputs\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"input_ports","Value":"input_ds","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"params","Value":"{\n 'columns': 'deal_number:new_deal_number|close:new_close',\n 'keep_old_columns': true\n}\n","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"output_ports","Value":"data","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_1","NodeId":"-539"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_2","NodeId":"-539"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_3","NodeId":"-539"}],"OutputPortsInternal":[{"Name":"data_1","NodeId":"-539","OutputType":null},{"Name":"data_2","NodeId":"-539","OutputType":null},{"Name":"data_3","NodeId":"-539","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":19,"Comment":"","CommentCollapsed":true},{"Id":"-185","ModuleId":"BigQuantSpace.use_datasource.use_datasource-v1","ModuleParameters":[{"Name":"datasource_id","Value":"bar1d_CN_STOCK_A","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"start_date","Value":"2018-01-01","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"end_date","Value":"2018-02-01","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"instruments","NodeId":"-185"},{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"features","NodeId":"-185"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-185","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":4,"Comment":"","CommentCollapsed":true},{"Id":"-27","ModuleId":"BigQuantSpace.rename_columns.rename_columns-v5","ModuleParameters":[{"Name":"columns","Value":"deal_number:new_deal_number|close:new_close","ValueType":"Literal","LinkedGlobalParameter":null},{"Name":"keep_old_columns","Value":"True","ValueType":"Literal","LinkedGlobalParameter":null}],"InputPortsInternal":[{"DataSourceId":null,"TrainedModelId":null,"TransformModuleId":null,"Name":"input_ds","NodeId":"-27"}],"OutputPortsInternal":[{"Name":"data","NodeId":"-27","OutputType":null}],"UsePreviousResults":true,"moduleIdForCode":2,"Comment":"","CommentCollapsed":true}],"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions><NodePosition Node='-539' Position='1354.393310546875,116.22413635253906,200,200'/><NodePosition Node='-185' Position='1235,-3,200,200'/><NodePosition Node='-27' Position='1143.0780639648438,236.81643676757812,200,200'/></NodePositions><NodeGroups /></DataV1>"},"IsDraft":true,"ParentExperimentId":null,"WebService":{"IsWebServiceExperiment":false,"Inputs":[],"Outputs":[],"Parameters":[{"Name":"交易日期","Value":"","ParameterDefinition":{"Name":"交易日期","FriendlyName":"交易日期","DefaultValue":"","ParameterType":"String","HasDefaultValue":true,"IsOptional":true,"ParameterRules":[],"HasRules":false,"MarkupType":0,"CredentialDescriptor":null}}],"WebServiceGroupId":null,"SerializedClientData":"<?xml version='1.0' encoding='utf-16'?><DataV1 xmlns:xsd='http://www.w3.org/2001/XMLSchema' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'><Meta /><NodePositions></NodePositions><NodeGroups /></DataV1>"},"DisableNodesUpdate":false,"Category":"user","Tags":[],"IsPartialRun":true}
    In [18]:
    # 本代码由可视化策略环境自动生成 2018年8月3日 23:53
    # 本代码单元只能在可视化模式下编辑。您也可以拷贝代码,粘贴到新建的代码单元或者策略,然后修改。
    
    
    m4 = M.use_datasource.v1(
        datasource_id='bar1d_CN_STOCK_A',
        start_date='2018-01-01',
        end_date='2018-02-01'
    )
    
    # 修改数据列名
    def m19_run_bigquant_run(input_ds, columns, keep_old_columns):
        # 解析列映射为dict, TODO: 验证输入是否有效
        columns = dict(c.split(':') for c in columns.split('|'))
        print('列名映射:', columns)
        # 输出数据源
        dataset_ds = DataSource()
        output_store = dataset_ds.open_df_store()
    
        for key, df in input_ds.iter_df():
            old_column_set = set(df.columns)
            for old_col, new_col in columns.items():
                if old_col not in old_column_set:
                    print('警告:列 %s 不存在' % old_col)
            if keep_old_columns:
                for old_col, new_col in columns.items():
                    if old_col in old_column_set:
                        df[new_col] = df[old_col]
            else:
                df.columns = [columns.get(c, c) for c in df.columns]
            df.to_hdf(output_store, key)
            row_count = len(df)
            print('%s: %s' % (key, len(df)))
    
        dataset_ds.close_df_store()
        return Outputs(data=dataset_ds)
    
    # 后处理函数,可选。输入是主函数的输出,可以在这里对数据做处理,或者返回更友好的outputs数据格式。此函数输出不会被缓存。
    def m19_post_run_bigquant_run(outputs):
        return outputs
    
    m19 = M.cached.v3(
        input_1=m4.data,
        run=m19_run_bigquant_run,
        post_run=m19_post_run_bigquant_run,
        input_ports='input_ds',
        params="""{
        'columns': 'deal_number:new_deal_number|close:new_close',
        'keep_old_columns': true
    }
    """,
        output_ports='data'
    )
    
    m2 = M.rename_columns.v5(
        input_ds=m4.data,
        columns='deal_number:new_deal_number|close:new_close',
        keep_old_columns=True
    )
    
    [2018-08-03 23:51:05.884144] INFO: bigquant: use_datasource.v1 开始运行..
    [2018-08-03 23:51:06.072156] INFO: bigquant: 命中缓存
    [2018-08-03 23:51:06.082901] INFO: bigquant: use_datasource.v1 运行完成[0.198757s].
    [2018-08-03 23:51:06.097946] INFO: bigquant: rename_columns.v5 开始运行..
    列名映射: {'close': 'new_close', 'deal_number': 'new_deal_number'}
    /data: 80837
    [2018-08-03 23:51:06.563193] INFO: bigquant: rename_columns.v5 运行完成[0.465224s].
    
    In [5]:
    m4.data.read_df().head()
    
    Out[5]:
    adjust_factor amount close instrument deal_number date high low open turn volume
    0 15.185156 96262413.0 42.973991 002505.SZA 12795 2018-01-02 43.277695 42.518436 43.125843 0.621448 34085236.0
    1 1.551832 998885620.0 9.993798 600050.SHA 54576 2018-01-02 10.009316 9.838614 9.854134 0.735207 155838868.0
    2 11.215669 100816918.0 234.071014 600993.SHA 5385 2018-01-02 234.519638 230.033371 230.706314 1.129214 4858288.0
    3 3.924462 207948497.0 129.114807 300496.SZA 9120 2018-01-02 129.821198 126.249939 127.545013 2.843751 6344324.0
    4 14.334615 219251624.0 269.347412 300142.SZA 6859 2018-01-02 270.637543 257.736389 258.023071 0.876127 11827733.0
    In [11]:
    m19.data.read_df().head()
    
    Out[11]:
    adjust_factor amount close instrument deal_number date high low open turn volume new_close new_deal_number
    0 15.185156 96262413.0 42.973991 002505.SZA 12795 2018-01-02 43.277695 42.518436 43.125843 0.621448 34085236.0 42.973991 12795
    1 1.551832 998885620.0 9.993798 600050.SHA 54576 2018-01-02 10.009316 9.838614 9.854134 0.735207 155838868.0 9.993798 54576
    2 11.215669 100816918.0 234.071014 600993.SHA 5385 2018-01-02 234.519638 230.033371 230.706314 1.129214 4858288.0 234.071014 5385
    3 3.924462 207948497.0 129.114807 300496.SZA 9120 2018-01-02 129.821198 126.249939 127.545013 2.843751 6344324.0 129.114807 9120
    4 14.334615 219251624.0 269.347412 300142.SZA 6859 2018-01-02 270.637543 257.736389 258.023071 0.876127 11827733.0 269.347412 6859
    In [ ]:
    print(M.rename_columns.v1.m_sourcecode[0][1])
    
    In [15]:
    # 修改数据列名
    # 修改数据列名
    def bigquant_run(input_ds, columns, keep_old_columns):
        # 解析列映射为dict, TODO: 验证输入是否有效
        columns = dict(c.split(':') for c in columns.split('|'))
        print('列名映射:', columns)
        # 输出数据源
        dataset_ds = DataSource()
        output_store = dataset_ds.open_df_store()
    
        for key, df in input_ds.iter_df():
            old_column_set = set(df.columns)
            for old_col, new_col in columns.items():
                if old_col not in old_column_set:
                    print('警告:列 %s 不存在' % old_col)
            if keep_old_columns:
                for old_col, new_col in columns.items():
                    if old_col in old_column_set:
                        df[new_col] = df[old_col]
            else:
                df.columns = [columns.get(c, c) for c in df.columns]
            df.to_hdf(output_store, key)
            row_count = len(df)
            print('%s: %s' % (key, len(df)))
    
        dataset_ds.close_df_store()
        return Outputs(data=dataset_ds)
    
    # 在m18上测试
    mx = bigquant_run(m4.data, 'deal_number:new_deal_number|close:new_close', keep_old_columns=True)
    
    列名映射: {'close': 'new_close', 'deal_number': 'new_deal_number'}
    /data: 80837
    
    In [16]:
    mx.data.read_df().head()
    
    Out[16]:
    adjust_factor amount close instrument deal_number date high low open turn volume new_close new_deal_number
    0 15.185156 96262413.0 42.973991 002505.SZA 12795 2018-01-02 43.277695 42.518436 43.125843 0.621448 34085236.0 42.973991 12795
    1 1.551832 998885620.0 9.993798 600050.SHA 54576 2018-01-02 10.009316 9.838614 9.854134 0.735207 155838868.0 9.993798 54576
    2 11.215669 100816918.0 234.071014 600993.SHA 5385 2018-01-02 234.519638 230.033371 230.706314 1.129214 4858288.0 234.071014 5385
    3 3.924462 207948497.0 129.114807 300496.SZA 9120 2018-01-02 129.821198 126.249939 127.545013 2.843751 6344324.0 129.114807 9120
    4 14.334615 219251624.0 269.347412 300142.SZA 6859 2018-01-02 270.637543 257.736389 258.023071 0.876127 11827733.0 269.347412 6859

    自定义模块教程-demo模块
    自定义模块教程-日期增加天数
    自定义模块教程-选择列
    自定义模块教程-拼接数据
    自定义模块教程-排序和股票代码过滤