/* * Copyright (c) 2006-2006 Endace Technology Ltd, Hamilton, New Zealand. * All rights reserved. * * This source code is proprietary to Endace Technology Limited and no part * of it may be redistributed, published or disclosed except as outlined in * the written contract supplied with this product. * * $Id: dagflood_model.c 13559 2010-12-21 00:53:02Z karthik.sharma $ */ /* dagflood headers. */ #include "dagflood_model.h" #include "dagflood_config.h" /* Endace headers. */ #include "dagapi.h" #include "dagnew.h" #include "dagutil.h" #include /* CVS Header. */ static const char* const kCvsHeader __attribute__ ((unused)) = "$Id: dagflood_model.c 13559 2010-12-21 00:53:02Z karthik.sharma $"; static const char* const kRevisionString = "$Revision: 13559 $"; /* Macros and constants */ #define RUN_TIMELIMIT 0 /* Do a time based uContinue */ #define RUN_REPEAT 1 /* Do a repeat uCounters uContinue */ #define BURST_MAX ONE_MEBI #define EIGHT_MEBI 8*ONE_MEBI #define BUFSIZE 256 /* Put all state inside a structure. */ typedef struct dagflood_state_ { #if defined(__FreeBSD__) || defined(__linux__) || defined (__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) pthread_cond_t cond; pthread_mutex_t mutex; FILE * infile; void * base; /* Pointer to the mmap'd file */ void * pos; /* Absolute pointer inside mmap buffer */ int infd; #elif defined(_WIN32) int outfile; char * base; /* Pointer to the mmap'd file */ char * pos; /* Absolute pointer inside mmap buffer */ HANDLE mapping; #endif /* Platform-specific code. */ int dagfd; DagfloodConfigPtr config; int report_now; int stop_now; uint32_t repeat; dagflood_info_t info; DagfloodInfoPtr client_info; uint64_t wbytes; /* Bytes written to stream buffer */ uint64_t old_wbytes; /* Bytes written up to the previous loop */ uint32_t seconds; /* Seconds the application has been running */ struct timeval start; /* Start time */ struct timeval end; /* End time */ uint8_t mmap_failed; /* File mapping failed ? */ int fd; /* File descriptor */ uint32_t size; /* File size */ char tx_buf1[BURST_MAX]; char tx_buf2[BURST_MAX]; } dagflood_state_t, * DagfloodStatePtr; /* Internal routines. */ static DagfloodStatePtr dagflood_state_init(DagfloodConfigPtr config); static void dagflood_state_dispose(DagfloodStatePtr state); static void initialize_dag(DagfloodStatePtr flooder); static void finalize_dag(DagfloodStatePtr flooder); static int init_file(DagfloodStatePtr flooder); static void run_file_copy_bytes(DagfloodStatePtr flooder); static void run_file_commit_bytes(DagfloodStatePtr flooder); static void copy_bytes(DagfloodStatePtr flooder); static void commit_bytes(DagfloodStatePtr flooder); #if defined _WIN32 static void remove_mapping(DagfloodStatePtr flooder); #endif /* Implementation of internal routines. */ static DagfloodStatePtr dagflood_state_init(DagfloodConfigPtr config) { DagfloodStatePtr result = (DagfloodStatePtr) dagutil_malloc(sizeof(*result)); assert(config); memset(result, 0, sizeof(*result)); result->dagfd = -1; result->config = config; #if defined(_WIN32) result->infile = -1; #elif defined (__FreeBSD__) || defined(__linux__) || defined(__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) result->infd = -1; pthread_cond_init(&result->cond, NULL); pthread_mutex_init(&result->mutex, NULL); #endif /* Platform-specific code. */ return result; } static void dagflood_state_dispose(DagfloodStatePtr state) { assert(state); #if defined(__FreeBSD__) || defined(__linux__) || defined (__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) pthread_cond_destroy(&state->cond); pthread_mutex_destroy(&state->mutex); #elif defined(_WIN32) #endif /* Platform-specific code. */ dagflood_config_dispose(state->config); /* Shut down file descriptors. */ if (-1 != state->dagfd) { dag_close(state->dagfd); state->dagfd = -1; } #if defined(__FreeBSD__) || defined(__linux__) || defined (__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) if (state->infile) { fclose(state->infile); } state->infile = NULL; state->infd = -1; #elif defined(_WIN32) close(state->infile); state->infile = -1; #endif /* Platform-specific code. */ dagutil_free(state); } static void initialize_dag(DagfloodStatePtr flooder) { const char* dagname = dagflood_config_get_device(flooder->config); int cached_stream = dagflood_config_get_dagstream(flooder->config); flooder->dagfd = dag_open((char*) dagname); if (flooder->dagfd < 0) { dagutil_panic("dag_open(%s): %s\n", dagname, strerror(errno)); } if (dag_tx_get_stream_count(flooder->dagfd) < 1) { dagutil_panic("dag_tx_get_stream_count(%s): The firmware loaded on this DAG card does not support transmit features.\n", dagname); } if (dag_attach_stream(flooder->dagfd, cached_stream, 0, dagutil_max(BURST_MAX, dagflood_config_get_burst_max(flooder->config))) != 0) { dagutil_panic("dag_attach_stream(%s): %s\n", dagname, strerror(errno)); } if (dag_start_stream(flooder->dagfd, cached_stream) < 0) { dagutil_panic("dag_start_stream(%s): %s\n", dagname, strerror(errno)); } } static void finalize_dag(DagfloodStatePtr flooder) { const char* dagname = dagflood_config_get_device(flooder->config); int cached_stream = dagflood_config_get_dagstream(flooder->config); if (dag_stop_stream(flooder->dagfd, cached_stream) < 0) { dagutil_panic("dag_stop_stream(%s): %s\n", dagname, strerror(errno)); } if (dag_detach_stream(flooder->dagfd, cached_stream) != 0) { dagutil_panic("dag_detach_stream(%s): %s\n", dagname, strerror(errno)); } if (dag_close(flooder->dagfd) < 0) { dagutil_panic("dag_close(%s): %s\n", dagname, strerror(errno)); } } static int init_file(DagfloodStatePtr flooder) { const char * filename = dagflood_config_get_infile_name(flooder->config); struct stat file_stats; #if defined(_WIN32) HANDLE hFile, hMapHandle; #endif /* _WIN32 */ /* Open file. */ #if defined(__FreeBSD__) || defined(__linux__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) flooder->fd = open(filename, O_RDONLY); #elif defined(_WIN32) flooder->fd = _open(filename, _O_RDONLY | _O_BINARY ,_S_IREAD); #else #error Compiling on an unsupported platform - please contact for assistance. #endif /* Platform-specific code. */ if (-1 == flooder->fd) { return 0; } /* Get file size. */ if ((fstat(flooder->fd, &file_stats)) < 0) { return 0; } flooder->size = file_stats.st_size; if (0 == flooder->size) { return 0; } /* Mmap file, so we have it in memory. */ if (dagutil_get_verbosity() > 0) { fprintf(stdout, "caching file... "); } #if defined(__FreeBSD__) || defined(__linux__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) flooder->base = mmap(NULL, flooder->size, PROT_READ, MAP_SHARED, flooder->fd, 0); if (flooder->base == MAP_FAILED) { flooder->mmap_failed = 1; close(flooder->fd); return 0; } #elif defined(_WIN32) /* mmap is done slightly differently in Windows. */ /* Get a mappable handle to the file. */ hFile = (HANDLE)_get_osfhandle(flooder->fd); if (hFile == INVALID_HANDLE_VALUE) dagutil_panic("invalid file descriptor: %s\n", strerror(errno)); hMapHandle = CreateFileMapping( hFile, /* Handle to file */ NULL, /* LPSECURITY_ATTRIBUTES lpAttributes, */ PAGE_READONLY, /* DWORD flProtect, */ 0, /* DWORD dwMaximumSizeHigh, */ (DWORD) flooder->size, /* DWORD dwMaximumSizeLow, */ "dagfloodmap" /* LPCTSTR lpName */ ); if (hMapHandle == NULL) { flooder->mmap_failed = 1; CloseHandle(hMapHandle); _close(flooder->fd); return 0; } /* do the mapping. */ flooder->base = MapViewOfFile( hMapHandle, /* HANDLE hFileMappingObject, */ FILE_MAP_READ, /* DWORD dwDesiredAccess, */ 0, /* DWORD dwFileOffsetHigh, */ 0, /* DWORD dwFileOffsetLow, */ 0 /* default map entire file */ ); if (flooder->base == NULL) { flooder->mmap_failed = 1; CloseHandle(hMapHandle); _close(flooder->fd); return 0; } flooder->mapping = hMapHandle; #else #error Compiling on an unsupported platform - please contact for assistance. #endif if (dagutil_get_verbosity() > 0) { fprintf(stdout, "done.\n"); } /* Update pointer inside file */ flooder->pos = flooder->base; return 1; } static void run_file_copy_bytes(DagfloodStatePtr flooder) { uint32_t cached_burst_microseconds = dagflood_config_get_inter_burst_microseconds(flooder->config); uint32_t cached_burst_max = dagflood_config_get_burst_max(flooder->config); uint8_t cached_run_mode = dagflood_config_get_run_mode(flooder->config); uint8_t cached_flush_tx_buffer = dagflood_config_get_flush_tx_buffer(flooder->config); int cached_dagfd = flooder->dagfd; uint32_t burst_length; /* Main loop */ while (0 == flooder->stop_now) { /* The maximum burst length should not be greater than the one defined * as default nor the remaining bytes in the file buffer. */ uint64_t remaining_bytes = (uint64_t) (flooder->size + flooder->base - flooder->pos); if ((uint64_t) cached_burst_max < remaining_bytes) { burst_length = cached_burst_max; } else { burst_length = (uint32_t) remaining_bytes; } /* Write to dag card (copy, so it's not zero-copy). */ dag_tx_stream_copy_bytes(cached_dagfd, 1, flooder->pos, burst_length); /* Advance offsets and counters. */ flooder->wbytes += burst_length; flooder->pos += burst_length; /* Keep file pointer inside boundaries. */ if (flooder->pos >= flooder->base + flooder->size) { flooder->pos = flooder->base; flooder->repeat--; } /* Should we repeat more times? */ /* We were running dagflood with a repeat number option */ if ((cached_run_mode == RUN_REPEAT) && (flooder->repeat == 0)) { flooder->stop_now = 1; cached_flush_tx_buffer = 1; /* Normal exit, flush buffer. */ } /* Inter burst delay. */ if (cached_burst_microseconds) usleep(cached_burst_microseconds); } /* Wait for the stream to be almost empty */ /* This way we are sure almost all the records are transmitted when we exit. */ if (1 == cached_flush_tx_buffer) { dag_tx_get_stream_space(flooder->dagfd, 1, dag_get_stream_buffer_size(flooder->dagfd, 1) - 8); } } static void run_file_commit_bytes(DagfloodStatePtr flooder) { uint32_t cached_burst_microseconds = dagflood_config_get_inter_burst_microseconds(flooder->config); uint32_t cached_burst_max = dagflood_config_get_burst_max(flooder->config); uint8_t cached_run_mode = dagflood_config_get_run_mode(flooder->config); uint8_t cached_flush_tx_buffer = dagflood_config_get_flush_tx_buffer(flooder->config); int cached_dagfd = flooder->dagfd; uint32_t burst_length; void * record; #if defined(_WIN32) MSG msg; #endif /* _WIN32 */ /* Main loop */ while (0 == flooder->stop_now) { /* The maximum burst length should not be greater than the one defined * as default nor the remaining bytes in the file buffer. */ uint64_t remaining_bytes = (uint64_t)(flooder->size + flooder->base - flooder->pos); if ((uint64_t) cached_burst_max < remaining_bytes) { burst_length = cached_burst_max; } else { burst_length = (uint32_t) remaining_bytes; } /* Get space for writing. */ record = dag_tx_get_stream_space(cached_dagfd, 1, burst_length); /* Copy bytes. */ memcpy(record, flooder->pos, burst_length); /* Commit data. */ record = dag_tx_stream_commit_bytes(cached_dagfd, 1, burst_length); /* Advance offsets and counters. */ flooder->wbytes += burst_length; flooder->pos += burst_length; /* Keep file pointer inside boundaries. */ if (flooder->pos >= flooder->base + flooder->size) { flooder->pos = flooder->base; flooder->repeat--; } /* Should we repeat more times? */ /* We were running dagflood with a repeat number option */ if ((cached_run_mode == RUN_REPEAT) && (flooder->repeat == 0)) { flooder->stop_now = 1; cached_flush_tx_buffer = 1; /* Normal exit, flush buffer. */ } #if defined(_WIN32) while (PeekMessage(&msg, NULL, 0, 0, PM_QS_POSTMESSAGE|PM_REMOVE) != 0) { DispatchMessage(&msg); } #endif /*_WIN32 */ /* Inter burst delay. */ if (cached_burst_microseconds) usleep(cached_burst_microseconds); } /* Wait for the stream to be almost empty. * This way we are sure almost all the records are transmitted when we exit. */ if (1 == cached_flush_tx_buffer) { record = dag_tx_get_stream_space(flooder->dagfd, 1, dag_get_stream_buffer_size(flooder->dagfd, 1) - 8); } } static void copy_bytes(DagfloodStatePtr flooder) { uint32_t cached_burst_microseconds = dagflood_config_get_inter_burst_microseconds(flooder->config); uint8_t cached_run_mode = dagflood_config_get_run_mode(flooder->config); uint8_t cached_flush_tx_buffer = dagflood_config_get_flush_tx_buffer(flooder->config); FILE *fp = NULL; uint32_t byte_cnt; uint32_t chunk_size; void *tx_buf[2]; uint8_t flip = 0; flooder->stop_now = 0; tx_buf[0] = flooder->tx_buf1; tx_buf[1] = flooder->tx_buf2; fp = fopen(dagflood_config_get_infile_name(flooder->config), "rb"); if (fp == NULL) { exit(EXIT_FAILURE); } chunk_size = dagutil_min(dagflood_config_get_burst_max(flooder->config), BURST_MAX); byte_cnt = fread(tx_buf[flip], sizeof(char), chunk_size, fp); if (byte_cnt < chunk_size) { if (0 == feof(fp)) { dagutil_panic("file error\n"); } } /* Main loop */ while (0 == flooder->stop_now) { /* Write to dag card (copy, so it's not zero-copy). */ dag_tx_stream_copy_bytes (flooder->dagfd, 1, (void*)tx_buf[flip], byte_cnt); flip = !flip; /* Advance counters. */ flooder->wbytes += byte_cnt; if (feof(fp)) { flooder->repeat--; if (flooder->repeat) { if (fseek(fp,0L,SEEK_SET)) { dagutil_panic("file error\n"); } } } /* Should we repeat more times? */ /* We were running dagflood with a repeat number option */ if ((cached_run_mode == RUN_REPEAT) && (flooder->repeat == 0)) { flooder->stop_now = 1; cached_flush_tx_buffer = 1; /* Normal exit, flush buffer. */ fclose(fp); } else { byte_cnt = fread(tx_buf[flip], sizeof(char), chunk_size , fp); if (byte_cnt < chunk_size) { if (!feof(fp)) dagutil_panic("file error\n"); } } /* Inter burst delay. */ if (cached_burst_microseconds) usleep(cached_burst_microseconds); } /* Wait for the stream to be almost empty */ /* This way we are sure almost all the records are transmitted when we exit. */ if (1 == cached_flush_tx_buffer) { dag_tx_get_stream_space(flooder->dagfd, 1, dag_get_stream_buffer_size(flooder->dagfd, 1) - 8); } } static void commit_bytes(DagfloodStatePtr flooder) { uint32_t cached_burst_microseconds = dagflood_config_get_inter_burst_microseconds(flooder->config); uint32_t cached_burst_max = dagflood_config_get_burst_max(flooder->config); uint8_t cached_run_mode = dagflood_config_get_run_mode(flooder->config); uint8_t cached_flush_tx_buffer = dagflood_config_get_flush_tx_buffer(flooder->config); const char * cached_infile_name = dagflood_config_get_infile_name(flooder->config); FILE * fp = NULL; uint32_t byte_cnt; uint32_t chunk_size; void * tx_buf[2]; uint8_t flip = 0; void * record; #if defined(_WIN32) MSG msg; #endif /* _WIN32 */ tx_buf[0] = flooder->tx_buf1; tx_buf[1] = flooder->tx_buf2; fp = fopen(cached_infile_name, "rb"); if (fp == NULL) { exit(EXIT_FAILURE); } chunk_size = dagutil_min(cached_burst_max, BURST_MAX); byte_cnt = fread(tx_buf[flip], sizeof(char), chunk_size , fp); if (byte_cnt < chunk_size) { if (0 == feof(fp)) { dagutil_panic("file error\n"); } } /* Main loop */ flooder->stop_now = 0; while (0 == flooder->stop_now) { /* Get space for writing. */ record = dag_tx_get_stream_space(flooder->dagfd, 1, byte_cnt); /* Copy bytes. */ memcpy(record, tx_buf[flip], byte_cnt); /* Commit data. */ record = dag_tx_stream_commit_bytes(flooder->dagfd, 1, byte_cnt); flip = !flip; /* Advance counters. */ flooder->wbytes += byte_cnt; if (feof(fp)) { flooder->repeat--; if (flooder->repeat) { if (fseek(fp, 0L, SEEK_SET)) { dagutil_panic("file error\n"); } } } /* Should we repeat more times? */ /* We were running dagflood with a repeat number option */ if ((cached_run_mode == RUN_REPEAT) && (flooder->repeat == 0)) { flooder->stop_now = 1; cached_flush_tx_buffer = 1; /* Normal exit, flush buffer. */ fclose(fp); } else { byte_cnt = fread(tx_buf[flip], sizeof(char), chunk_size , fp); if (byte_cnt < chunk_size) { if (0 == feof(fp)) { dagutil_panic("file error\n"); } } } #if defined(_WIN32) while (PeekMessage(&msg, NULL, 0, 0, PM_QS_POSTMESSAGE|PM_REMOVE) != 0) { DispatchMessage(&msg); } #endif /*_WIN32 */ /* Inter burst delay. */ if (cached_burst_microseconds) usleep(cached_burst_microseconds); } /* Wait for the stream to be almost empty. * This way we are sure almost all the records are transmitted when we exit. */ if (1 == cached_flush_tx_buffer) { record = dag_tx_get_stream_space(flooder->dagfd, 1, dag_get_stream_buffer_size(flooder->dagfd, 1) - 8); } } #if defined(_WIN32) static void remove_mapping(file_t * file) { UnmapViewOfFile(file->base); CloseHandle(file->mapping); } #endif /* _WIN32 */ /* Public routines. */ DagfloodPtr dagflood_create(DagfloodConfigPtr config) { DagfloodStatePtr state = NULL; assert(config); state = dagflood_state_init(config); /* Open the input file. */ if (0 == init_file(state)) { dagflood_state_dispose(state); return NULL; } return (DagfloodPtr) state; } void dagflood_dispose(DagfloodPtr flooder) { DagfloodStatePtr state = (DagfloodStatePtr) flooder; assert(state); dagflood_state_dispose(state); } void dagflood_get_status(DagfloodPtr flooder, DagfloodInfoPtr info) { DagfloodStatePtr state = (DagfloodStatePtr) flooder; assert(state); assert(info); if (NULL == info) { return; } #if defined(__FreeBSD__) || defined(__linux__) || defined (__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) /* Acquire mutex. */ pthread_mutex_lock(&state->mutex); #elif defined(_WIN32) #endif /* Platform-specific code. */ state->client_info = info; state->report_now = 1; while (1 == state->report_now) { #if defined(__FreeBSD__) || defined(__linux__) || defined (__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) /* Wait on condition variable for main thread to update view state. */ pthread_cond_wait(&state->cond, &state->mutex); #elif defined(_WIN32) #endif /* Platform-specific code. */ } state->client_info = NULL; #if defined(__FreeBSD__) || defined(__linux__) || defined (__NetBSD__) || (defined(__SVR4) && defined(__sun)) || defined(__APPLE__) /* Release mutex. */ pthread_mutex_unlock(&state->mutex); #elif defined(_WIN32) #endif /* Platform-specific code. */ } int dagflood_transmit(DagfloodPtr flooder) { DagfloodStatePtr state = (DagfloodStatePtr) flooder; uint8_t cached_run_mode = 0; assert(state); cached_run_mode = dagflood_config_get_run_mode(state->config); /* Init dag session and device */ initialize_dag(state); /* Record start time */ gettimeofday(&state->start, 0); /* Every run type uses a diferent API interface. * This is only for educational and testing purposes. * You really only need one of these two interfaces. */ switch (cached_run_mode) { case 1: if (0 == state->mmap_failed) { run_file_commit_bytes(state); } else { commit_bytes(state); } break; case 2: if (0 == state->mmap_failed) { run_file_copy_bytes(state); } else { copy_bytes(state); } break; default: dagutil_panic("uContinue type not supported (%d)\n", cached_run_mode); break; } /* Record end time */ gettimeofday(&state->end, 0); #if defined(_WIN32) /* Clean up the file structures used if running on Windows */ if (0 == state->mmap_failed) { remove_mapping(state); } #endif /* WIN32 */ /* Close dag session and device */ finalize_dag(state); return 0; } void dagflood_stop(DagfloodPtr flooder) { } .