开发者

Why is Erlang driver output different than expected?

I've tried to add support for 64-bit integers to my fork of Erlang SQLite 3 wrapper (since that's the main integer type used by SQLite 3). While it looked very simple, I've run into a problem.

This is the code for the C side of the driver with debugging statements (see https://github.com/alexeyr/erlang-sqlite3/tree/64bit for other files, but I think the problem is contained here):

#include "sqlite3_drv.h"

// Callback Array
static ErlDrvEntry basic_driver_entry = {
    NULL,                             /* init */
    start,                            /* startup (defined below) */
    stop,                             /* shutdown (defined below) */
    NULL,                             /* output */
    NULL,                             /* ready_input */
    NULL,                             /* ready_output */
    "sqlite3_drv",                    /* the name of the driver */
    NULL,                             /* finish */
    NULL,                             /* handle */
    control,                          /* control */
    NULL,                             /* timeout */
    NULL,                             /* outputv (defined below) */
    ready_async,                      /* ready_async */
    NULL,                             /* flush */
    NULL,                             /* call */
    NULL,                             /* event */
    ERL_DRV_EXTENDED_MARKER,          /* ERL_DRV_EXTENDED_MARKER */
    ERL_DRV_EXTENDED_MAJOR_VERSION,   /* ERL_DRV_EXTENDED_MAJOR_VERSION */
    ERL_DRV_EXTENDED_MAJOR_VERSION,   /* ERL_DRV_EXTENDED_MINOR_VERSION */
    ERL_DRV_FLAG_USE_PORT_LOCKING     /* ERL_DRV_FLAGs */
};

DRIVER_INIT(basic_driver) {
  return &basic_driver_entry;
}

// Driver Start
static ErlDrvData start(ErlDrvPort port, char* cmd) {
  sqlite3_drv_t* retval = (sqlite3_drv_t*) driver_alloc(sizeof(sqlite3_drv_t));
  struct sqlite3 *db = 0;
  int status = 0;

  retval->log = fopen ("/tmp/erlang-sqlite3-drv.log", "a+");
  if (!retval->log) {
    fprintf (stderr, "Can't create log file\n");
  }

  fprintf (retval->log, "--- Start erlang-sqlite3 driver\nCommand line: [%s]\n", cmd);


  const char *db_name = strstr (cmd, " ");
  if (!db_name) {
    fprintf (retval->log, "ERROR: DB name should be passed at command line\n");
    db_name = DB_PATH;
  } else {
    ++db_name;
  }

  // Create and open the database
  sqlite3_open(db_name, &db);
  status = sqlite3_errcode(db);

  if(status != SQLITE_OK) {
    fprintf(retval->log, "ERROR: Unabled to open file: %s because %s\n\n", DB_PATH, sqlite3_errmsg(db));
  } else {
    fprintf(retval->log, "Opened file %s\n", db_name);
  }

  // Set the state for the driver
  retval->port = port;
  retval->db = db;
  retval->key = 42; //FIXME: Just a magic number, make real key

#define STR_(ARG) #ARG
#define STR(ARG) STR_(ARG)
  retval->atom_error = driver_mk_atom ("error");
  retval->atom_columns = driver_mk_atom ("columns");
  retval->atom_rows = driver_mk_atom ("rows");
  retval->atom_null = driver_mk_atom (STR (NULL_ATOM));
  retval->atom_id = driver_mk_atom ("id");
  retval->atom_ok = driver_mk_atom ("ok");
  retval->atom_unknown_cmd = driver_mk_atom ("uknown_command");

  fflush (retval->log);
  return (ErlDrvData) retval;
}


// Driver Stop
static void stop(ErlDrvData handle) {
  sqlite3_drv_t* driver_data = (sqlite3_drv_t*) handle;

  sqlite3_close(driver_data->db);
  fclose (driver_data->log);
  driver_data->log = 0;

  driver_free(driver_data);
}

// Handle input from Erlang VM
static int control(ErlDrvData drv_data, unsigned int command, char *buf, 
                   int len, char **rbuf, int rlen) {
  sqlite3_drv_t* driver_data = (sqlite3_drv_t*) drv_data;

  switch(command) {
    case CMD_SQL_EXEC:
      sql_exec(driver_data, buf, len);
      break;
    default:
      unknown(driver_data, buf, len);
  }
  return 0;
}


static inline int return_error(sqlite3_drv_t *drv, const char *error, ErlDrvTermData **spec, int *terms_count) {
  *spec = (ErlDrvTermData *)calloc(7, sizeof(ErlDrvTermData));
  (*spec)[0] = ERL_DRV_ATOM;
  (*spec)[1] = drv->atom_error;
  (*spec)[2] = ERL_DRV_STRING;
  (*spec)[3] = (ErlDrvTermData)error;
  (*spec)[4] = strlen(error);
  (*spec)[5] = ERL_DRV_TUPLE;
  (*spec)[6] = 2;
  *terms_count = 7;
  return 0;
}

static int sql_exec(sqlite3_drv_t *drv, char *command, int command_size) {

  int result, next_row;
  char *rest = NULL;
  sqlite3_stmt *statement;

  // fprintf(drv->log, "Preexec: %.*s\n", command_size, command);
  // fflush (drv->log);
  result = sqlite3_prepare_v2(drv->db, command, command_size, &statement, (const char **)&rest);
  if(result != SQLITE_OK) { 
    ErlDrvTermData *dataset;
    int term_count;
    return_error(drv, sqlite3_errmsg(drv->db), &dataset, &term_count); 
    driver_output_term(drv->port, dataset, term_count);
    return 0;
  }

  async_sqlite3_command *async_command = (async_sqlite3_command *)calloc(1, sizeof(async_sqlite3_command));
  async_command->driver_data = drv;
  async_command->statement = statement;

  // fprintf(drv->log, "Driver async: %d %p\n", SQLITE_VERSION_NUMBER, async_command->statement);
  // fflush (drv->log);

  if (sqlite3_threadsafe()) {
    drv->async_handle = driver_async(drv->port, &drv->key, sql_exec_async, async_command, sql_free_async);
  } else {
    sql_exec_async(async_command);
    ready_async((ErlDrvData)drv, (ErlDrvThreadData)async_command);
  }
  return 0;
}

static void sql_free_async(void *_async_command)
{
  int i;
  async_sqlite3_command *async_command = (async_sqlite3_command *)_async_command;
  free(async_command->dataset);

  async_command->driver_data->async_handle = 0;

  if (async_command->int64s) {
    开发者_JS百科free(async_command->int64s);
  }
  if (async_command->floats) {
    free(async_command->floats);
  }
  for (i = 0; i < async_command->binaries_count; i++) {
    driver_free_binary(async_command->binaries[i]);
  }
  if(async_command->binaries) {
    free(async_command->binaries);
  }
  if (async_command->statement) {
    sqlite3_finalize(async_command->statement);
  }
  free(async_command);
}


static void sql_exec_async(void *_async_command) {
  async_sqlite3_command *async_command = (async_sqlite3_command *)_async_command;
  ErlDrvTermData *dataset = async_command->dataset;
  int term_count = async_command->term_count;
  int row_count = async_command->row_count;
  sqlite3_drv_t *drv = async_command->driver_data;

  int result, next_row, column_count;
  char *error = NULL;
  char *rest = NULL;
  sqlite3_stmt *statement = async_command->statement;

  sqlite3_int64 *int64s = NULL;
  int int64_count = 0;

  double *floats = NULL;
  int float_count = 0;

  ErlDrvBinary **binaries = NULL;
  int binaries_count = 0;
  int i;


  column_count = sqlite3_column_count(statement);
  dataset = NULL;

  term_count += 2;
  dataset = realloc(dataset, sizeof(*dataset) * term_count);
  dataset[term_count - 2] = ERL_DRV_PORT;
  dataset[term_count - 1] = driver_mk_port(drv->port);

  if (column_count > 0) {
    int base = term_count;
    term_count += 2 + column_count*3 + 1 + 2 + 2 + 2;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[base] = ERL_DRV_ATOM;
    dataset[base + 1] = drv->atom_columns;
    for (i = 0; i < column_count; i++) {
      char *column_name = (char *)sqlite3_column_name(statement, i);
      // fprintf(drv->log, "Column: %s\n", column_name);
      // fflush (drv->log);

      dataset[base + 2 + (i*3)] = ERL_DRV_STRING;
      dataset[base + 2 + (i*3) + 1] = (ErlDrvTermData) column_name;
      dataset[base + 2 + (i*3) + 2] = strlen (column_name);
    }
    dataset[base + 2 + column_count*3 + 0] = ERL_DRV_NIL;
    dataset[base + 2 + column_count*3 + 1] = ERL_DRV_LIST;
    dataset[base + 2 + column_count*3 + 2] = column_count + 1;
    dataset[base + 2 + column_count*3 + 3] = ERL_DRV_TUPLE;
    dataset[base + 2 + column_count*3 + 4] = 2;

    dataset[base + 2 + column_count*3 + 5] = ERL_DRV_ATOM;
    dataset[base + 2 + column_count*3 + 6] = drv->atom_rows;
  }

  // fprintf(drv->log, "Exec: %s\n", sqlite3_sql(statement));
  // fflush (drv->log);

  while ((next_row = sqlite3_step(statement)) == SQLITE_ROW) {

    for (i = 0; i < column_count; i++) {
      // fprintf(drv->log, "Column %d type: %d\n", i, sqlite3_column_type(statement, i));
      // fflush (drv->log);
      switch (sqlite3_column_type(statement, i)) {
        case SQLITE_INTEGER: {
          int64_count++;
          int64s = realloc(int64s, sizeof(sqlite3_int64) * int64_count);
          int64s[int64_count - 1] = sqlite3_column_int64(statement, i);

          term_count += 2;
          dataset = realloc(dataset, sizeof(*dataset) * term_count);
          dataset[term_count - 2] = ERL_DRV_INT64;
          dataset[term_count - 1] = (ErlDrvTermData)&int64s[int64_count - 1];
          printf("Dataset element: %lld\n", *((sqlite3_int64 *) dataset[term_count - 1]));
          break;
        }
        case SQLITE_FLOAT: {
          float_count++;
          floats = realloc(floats, sizeof(double) * float_count);
          floats[float_count - 1] = sqlite3_column_double(statement, i);

          term_count += 2;
          dataset = realloc(dataset, sizeof(*dataset) * term_count);
          dataset[term_count - 2] = ERL_DRV_FLOAT;
          dataset[term_count - 1] = (ErlDrvTermData)&floats[float_count - 1];
          break;
        }
        case SQLITE_BLOB: 
        case SQLITE_TEXT: {
          int bytes = sqlite3_column_bytes(statement, i);
          binaries_count++;
          binaries = realloc(binaries, sizeof(*binaries) * binaries_count);
          binaries[binaries_count - 1] = driver_alloc_binary(bytes);
          binaries[binaries_count - 1]->orig_size = bytes;
          memcpy(binaries[binaries_count - 1]->orig_bytes, sqlite3_column_blob(statement, i), bytes);

          term_count += 4;
          dataset = realloc(dataset, sizeof(*dataset) * term_count);
          dataset[term_count - 4] = ERL_DRV_BINARY;
          dataset[term_count - 3] = (ErlDrvTermData)binaries[binaries_count - 1];
          dataset[term_count - 2] = bytes;
          dataset[term_count - 1] = 0;
          break;
        }
        case SQLITE_NULL: {
          term_count += 2;
          dataset = realloc (dataset, sizeof (*dataset) * term_count);
          dataset[term_count - 2] = ERL_DRV_ATOM;
          dataset[term_count - 1] = drv->atom_null;
          break;
        }
      }
    }
    term_count += 2;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[term_count - 2] = ERL_DRV_TUPLE;
    dataset[term_count - 1] = column_count;

    row_count++;
  }
  async_command->row_count = row_count;
  async_command->int64s = int64s;
  async_command->floats = floats;
  async_command->binaries = binaries;
  async_command->binaries_count = binaries_count;


  if (next_row == SQLITE_BUSY) {
    return_error(drv, "SQLite3 database is busy", &async_command->dataset, &async_command->term_count);
    return;
  }
  if (next_row != SQLITE_DONE) {
    return_error(drv, sqlite3_errmsg(drv->db), &async_command->dataset, &async_command->term_count);
    return;
  }


  if (column_count > 0) {
    term_count += 3;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[term_count - 3] = ERL_DRV_NIL;
    dataset[term_count - 2] = ERL_DRV_LIST;
    dataset[term_count - 1] = row_count + 1;

    term_count += 2;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[term_count - 2] = ERL_DRV_TUPLE;
    dataset[term_count - 1] = 2;


    term_count += 3;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[term_count - 3] = ERL_DRV_NIL;
    dataset[term_count - 2] = ERL_DRV_LIST;
    dataset[term_count - 1] = 3;

  } else if (strcasestr(sqlite3_sql(statement), "INSERT"))  {

    sqlite3_int64 rowid = sqlite3_last_insert_rowid(drv->db);
    term_count += 6;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[term_count - 6] = ERL_DRV_ATOM;
    dataset[term_count - 5] = drv->atom_id;
    dataset[term_count - 4] = ERL_DRV_INT;
    dataset[term_count - 3] = rowid;
    dataset[term_count - 2] = ERL_DRV_TUPLE;
    dataset[term_count - 1] = 2;
  } else {
    term_count += 6;
    dataset = realloc(dataset, sizeof(*dataset) * term_count);
    dataset[term_count - 6] = ERL_DRV_ATOM;
    dataset[term_count - 5] = drv->atom_ok;
    dataset[term_count - 4] = ERL_DRV_INT;
    dataset[term_count - 3] = next_row;
    dataset[term_count - 2] = ERL_DRV_TUPLE;
    dataset[term_count - 1] = 2;
  }

  term_count += 2;
  dataset = realloc(dataset, sizeof(*dataset) * term_count);
  dataset[term_count - 2] = ERL_DRV_TUPLE;
  dataset[term_count - 1] = 2;




  async_command->dataset = dataset;
  async_command->term_count = term_count;
  // fprintf(drv->log, "Total term count: %p %d, rows count: %dx%d\n", statement, term_count, column_count, row_count);
  // fflush (drv->log);
}

static void ready_async(ErlDrvData drv_data, ErlDrvThreadData thread_data)
{
  async_sqlite3_command *async_command = (async_sqlite3_command *)thread_data;
  sqlite3_drv_t *drv = async_command->driver_data;

  int res = driver_output_term(drv->port, async_command->dataset, async_command->term_count);
  // fprintf(drv->log, "Total term count: %p %d, rows count: %d (%d)\n", async_command->statement, async_command->term_count, async_command->row_count, res);
  // fflush (drv->log);
  sql_free_async(async_command);
}


// Unkown Command
static int unknown(sqlite3_drv_t *drv, char *command, int command_size) {
  // Return {error, unkown_command}
  ErlDrvTermData spec[] = {ERL_DRV_ATOM, drv->atom_error,
               ERL_DRV_ATOM, drv->atom_unknown_cmd,
               ERL_DRV_TUPLE, 2};
  return driver_output_term(drv->port, spec, sizeof(spec) / sizeof(spec[0]));
}

The tests output this:

module 'sqlite3_test'
  sqlite3_test: basic_functionality_test...Dataset element: 1
Dataset element: 20
Dataset element: 2000
Dataset element: 2
Dataset element: 30
Dataset element: 2000
*failed*
::error:{assertEqual_failed,
          [{module,sqlite3_test},
           {line,58},
           {expression,"sqlite3 : sql_exec ( ct , \"select * from user;\" )"},
           {expected,
               [{columns,["id","name","age","wage"]},
                {rows,[{1,<<"abby">>,20,2000},{2,<<"marge">>,30,2000}]}]},
           {value,
               [{columns,["id","name","age","wage"]},
                {rows,[{0,<<"abby">>,20,2000},{2,<<"marge">>,30,2000}]}]}]}
  in function sqlite3_test:'-basic_functionality_test/0-fun-5-'/1
  in call from sqlite3_test:basic_functionality_test/0


  sqlite3_test: large_number_test...Dataset element: 4294967298
Dataset element: 4294967298
*failed*
::error:{assertEqual_failed,
          [{module,sqlite3_test},
           {line,141},
           {expression,"rows ( sqlite3 : sql_exec ( ct , Query1 ) )"},
           {expected,[{4294967298,4294967298}]},
           {value,[{4294967296,4294967298}]}]}
  in function sqlite3_test:'-large_number_test/0-fun-0-'/2

So when the first column is an integer, the correct value is placed into dataset, but Erlang receives a different value from the driver. Why could this happen?


The problem happened due to reallocating arrays. When I printed the address of int64s along with their value, I got output similar to

While filling row: (0x8a9c010:4294967298 )
dataset (22 terms):
int64: 0x8a9c010:4294967298
While filling row: (0x8a9bfb8:4294967298 0x8a9bfc0:4294967298 )
dataset (24 terms):
int64: 0x8a9c010:4294967296
int64: 0x8a9bfc0:4294967298
While filling row: (0x8a9bfb8:4294967298 0x8a9bfc0:4294967298 0x8a9bfc8:4294967298 )
dataset (26 terms):
int64: 0x8a9c010:4294967296
int64: 0x8a9bfc0:4294967298
int64: 0x8a9bfc8:4294967298

(The array int64s is printed in parentheses after "while filling row", the values below are taken from the dataset). So, when the array is reallocated, the pointer listed in the dataset points to the old array.

Fixed by switching to a linked list.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜